跳到主要内容

Redis 缓存淘汰策略

Redis 缓存淘汰策略概述

Redis 作为内存数据库,当内存空间不足时需要通过淘汰策略来释放内存空间。合理的淘汰策略能够保证系统的高效运行,避免内存溢出问题。

Redis 支持的缓存淘汰策略

Redis 提供了 8 种缓存淘汰策略,可以分为三大类:

针对设置了过期时间的键(volatile)

# 配置示例
maxmemory-policy volatile-lru

volatile-lru(默认策略)

# 模拟 LRU 淘汰逻辑
def volatile_lru_evict():
"""
从设置了过期时间的键中,淘汰最近最少使用的键
"""
expired_keys = get_keys_with_expire() # 获取有过期时间的键
if not expired_keys:
return False # 没有可淘汰的键

lru_key = find_least_recently_used(expired_keys)
delete_key(lru_key)
return True

volatile-lfu

def volatile_lfu_evict():
"""
从设置了过期时间的键中,淘汰访问频率最低的键
"""
expired_keys = get_keys_with_expire()
if not expired_keys:
return False

lfu_key = find_least_frequently_used(expired_keys)
delete_key(lfu_key)
return True

volatile-random

def volatile_random_evict():
"""
从设置了过期时间的键中,随机淘汰一个键
"""
expired_keys = get_keys_with_expire()
if not expired_keys:
return False

random_key = random.choice(expired_keys)
delete_key(random_key)
return True

volatile-ttl

def volatile_ttl_evict():
"""
从设置了过期时间的键中,淘汰剩余TTL最短的键
"""
expired_keys = get_keys_with_expire()
if not expired_keys:
return False

ttl_key = find_shortest_ttl(expired_keys)
delete_key(ttl_key)
return True

针对所有键(allkeys)

allkeys-lru

def allkeys_lru_evict():
"""
从所有键中,淘汰最近最少使用的键
适用场景:缓存场景,遵循局部性原理
"""
all_keys = get_all_keys()
lru_key = find_least_recently_used(all_keys)
delete_key(lru_key)
return True

allkeys-lfu

def allkeys_lfu_evict():
"""
从所有键中,淘汰访问频率最低的键
适用场景:热点数据明显的业务
"""
all_keys = get_all_keys()
lfu_key = find_least_frequently_used(all_keys)
delete_key(lfu_key)
return True

allkeys-random

def allkeys_random_evict():
"""
从所有键中,随机淘汰一个键
适用场景:访问模式完全随机的业务
"""
all_keys = get_all_keys()
random_key = random.choice(all_keys)
delete_key(random_key)
return True

不淘汰策略

noeviction

def noeviction_policy():
"""
不淘汰任何键,内存满时直接返回错误
适用场景:关键数据不能丢失的业务
"""
return "OOM command not allowed when used memory > 'maxmemory'"

缓存更新策略

算法剔除策略

LRU 算法实现原理

class LRUCache:
def __init__(self, capacity):
self.capacity = capacity
self.cache = {}
self.order = [] # 维护访问顺序

def get(self, key):
if key in self.cache:
# 更新访问顺序
self.order.remove(key)
self.order.append(key)
return self.cache[key]
return None

def put(self, key, value):
if len(self.cache) >= self.capacity:
# 淘汰最久未使用的键
oldest = self.order.pop(0)
del self.cache[oldest]

self.cache[key] = value
self.order.append(key)

LFU 算法实现原理

class LFUCache:
def __init__(self, capacity):
self.capacity = capacity
self.cache = {}
self.freq = {} # 记录访问频次
self.min_freq = 0

def get(self, key):
if key in self.cache:
self.freq[key] = self.freq.get(key, 0) + 1
return self.cache[key]
return None

def evict_least_frequent(self):
# 找到频次最低的键进行淘汰
min_key = min(self.freq.keys(), key=lambda k: self.freq[k])
del self.cache[min_key]
del self.freq[min_key]

超时剔除策略

过期键删除策略

def expire_key_strategies():
"""
Redis 过期键删除的三种策略
"""

# 1. 定期删除(主动)
def scheduled_delete():
"""
每秒执行10次,每次随机检查20个设置了过期时间的键
如果过期键比例超过25%,继续检查
"""
for _ in range(20):
key = random_expire_key()
if is_expired(key):
delete_key(key)

# 2. 惰性删除(被动)
def lazy_delete(key):
"""
访问键时检查是否过期
"""
if is_expired(key):
delete_key(key)
return None
return get_value(key)

# 3. 内存压力删除
def memory_pressure_delete():
"""
内存不足时主动清理过期键
"""
if memory_usage() > memory_limit():
expired_keys = find_all_expired_keys()
for key in expired_keys:
delete_key(key)

TTL 设置最佳实践

# 设置不同业务场景的TTL
SET user:1001 "user_data" EX 3600 # 用户数据:1小时
SET session:abc123 "session_data" EX 1800 # 会话数据:30分钟
SET cache:hot_data "data" EX 300 # 热点数据:5分钟
SET temp:process_id "temp_data" EX 60 # 临时数据:1分钟

主动更新策略

Cache Aside 模式实现

class CacheAsidePattern:
def __init__(self, cache, database):
self.cache = cache
self.database = database

def get(self, key):
# 1. 先查缓存
value = self.cache.get(key)
if value is not None:
return value

# 2. 缓存未命中,查数据库
value = self.database.get(key)
if value is not None:
# 3. 写入缓存
self.cache.set(key, value, expire=3600)

return value

def update(self, key, value):
# 1. 先更新数据库
success = self.database.update(key, value)
if success:
# 2. 删除缓存(而不是更新)
self.cache.delete(key)
return success

Write Through 模式实现

class WriteThroughPattern:
def __init__(self, cache, database):
self.cache = cache
self.database = database

def write(self, key, value):
try:
# 1. 同时写入缓存和数据库
db_success = self.database.write(key, value)
if db_success:
cache_success = self.cache.set(key, value)
return cache_success
return False
except Exception as e:
# 保证数据一致性
self.cache.delete(key)
raise e

Write Behind 模式实现

class WriteBehindPattern:
def __init__(self, cache, database, queue):
self.cache = cache
self.database = database
self.queue = queue
self.dirty_keys = set() # 记录脏数据

def write(self, key, value):
# 1. 立即写入缓存
self.cache.set(key, value)

# 2. 标记为脏数据
self.dirty_keys.add(key)

# 3. 异步写入数据库
self.queue.put({
'operation': 'write',
'key': key,
'value': value
})

return True

def flush_dirty_data(self):
"""定期刷新脏数据到数据库"""
while self.dirty_keys:
batch = list(self.dirty_keys)[:100] # 批量处理
for key in batch:
value = self.cache.get(key)
if value:
self.database.write(key, value)
self.dirty_keys.remove(key)

缓存粒度控制的考量因素

数据访问模式分析

访问模式分析代码示例

class CacheGranularityAnalyzer:
def __init__(self):
self.access_patterns = {}
self.field_access_count = {}

def analyze_access_pattern(self, user_id, fields_accessed):
"""分析用户数据的访问模式"""
key = f"user:{user_id}"

# 记录字段访问频次
for field in fields_accessed:
field_key = f"{key}:{field}"
self.field_access_count[field_key] = self.field_access_count.get(field_key, 0) + 1

# 分析访问模式
total_fields = len(self.get_user_schema())
accessed_fields = len(set(fields_accessed))

if accessed_fields / total_fields > 0.8:
return "FULL_OBJECT" # 访问80%以上字段,建议粗粒度
elif accessed_fields <= 2:
return "SPECIFIC_FIELDS" # 访问少量字段,建议细粒度
else:
return "MIXED_PATTERN" # 混合模式

def recommend_cache_strategy(self, pattern):
"""根据访问模式推荐缓存策略"""
strategies = {
"FULL_OBJECT": {
"granularity": "coarse",
"cache_key": "user:{user_id}",
"ttl": 3600,
"description": "缓存完整用户对象"
},
"SPECIFIC_FIELDS": {
"granularity": "fine",
"cache_key": "user:{user_id}:{field}",
"ttl": 7200,
"description": "按字段分别缓存"
},
"MIXED_PATTERN": {
"granularity": "hybrid",
"cache_key": "混合策略",
"ttl": "根据字段重要性设置",
"description": "热点字段细粒度,完整对象粗粒度"
}
}
return strategies.get(pattern)

内存使用效率

内存消耗对比分析

def memory_efficiency_analysis():
"""
对比不同粒度策略的内存使用效率
"""

# 示例:用户对象包含10个字段
user_fields = {
'id': 8, # bytes
'name': 20, # bytes
'email': 30, # bytes
'phone': 15, # bytes
'address': 100, # bytes
'avatar': 200, # bytes
'settings': 500, # bytes
'preferences': 300,# bytes
'metadata': 1000, # bytes
'statistics': 800 # bytes
}

# 粗粒度:缓存整个对象
coarse_memory = sum(user_fields.values()) + 50 # 对象序列化开销

# 细粒度:分别缓存每个字段(假设只访问3个字段)
hot_fields = ['id', 'name', 'email']
fine_memory = sum(user_fields[field] for field in hot_fields) + 30 * 3 # 每个key的开销

print(f"粗粒度内存使用: {coarse_memory} bytes")
print(f"细粒度内存使用: {fine_memory} bytes")
print(f"内存节省: {((coarse_memory - fine_memory) / coarse_memory * 100):.1f}%")

网络传输开销

def network_overhead_analysis():
"""
分析不同粒度策略的网络开销
"""

# 粗粒度:单次请求获取所有数据
coarse_requests = 1
coarse_data_size = 2973 # 完整对象大小
coarse_overhead = coarse_requests * 100 # 每次请求的网络头开销

# 细粒度:多次请求获取需要的字段
fine_requests = 3 # 分别请求3个字段
fine_data_size = 58 # 只传输需要的字段
fine_overhead = fine_requests * 100

print(f"粗粒度: {coarse_requests}次请求, 数据{coarse_data_size}B, 开销{coarse_overhead}B")
print(f"细粒度: {fine_requests}次请求, 数据{fine_data_size}B, 开销{fine_overhead}B")

数据一致性要求

业务场景适配

电商平台缓存粒度策略

class EcommereCacheStrategy:
def __init__(self):
self.cache_configs = {
# 商品信息:读多写少,粗粒度缓存
"product": {
"granularity": "coarse",
"key_pattern": "product:{product_id}",
"ttl": 86400, # 24小时
"reason": "商品信息整体访问,更新频率低"
},

# 用户信息:部分字段高频访问,混合策略
"user": {
"granularity": "hybrid",
"hot_fields": ["id", "name", "level", "balance"],
"hot_key_pattern": "user:{user_id}:{field}",
"cold_key_pattern": "user:{user_id}:profile",
"hot_ttl": 7200, # 热点字段2小时
"cold_ttl": 3600, # 完整信息1小时
"reason": "基础信息高频访问,详细信息低频访问"
},

# 库存信息:强一致性要求,细粒度缓存
"inventory": {
"granularity": "fine",
"key_pattern": "inventory:{product_id}:{warehouse_id}",
"ttl": 300, # 5分钟
"reason": "库存变化频繁,需要精确控制"
},

# 订单信息:状态变化频繁,粗粒度缓存
"order": {
"granularity": "coarse",
"key_pattern": "order:{order_id}",
"ttl": 1800, # 30分钟
"reason": "订单状态更新时整体失效"
}
}

def get_cache_strategy(self, business_type):
"""获取业务类型对应的缓存策略"""
return self.cache_configs.get(business_type)

缓存策略选择建议

最佳实践总结

  1. 监控与调优
# 监控内存使用情况
redis-cli INFO memory

# 监控淘汰统计
redis-cli INFO stats | grep evicted

# 监控命中率
redis-cli INFO stats | grep keyspace
  1. 配置建议
# 生产环境推荐配置
maxmemory 2gb
maxmemory-policy allkeys-lru
maxmemory-samples 5

# 开启延迟监控
latency-monitor-threshold 100

通过合理选择缓存淘汰策略、更新策略和粒度控制,可以显著提升 Redis 缓存系统的性能和稳定性。在实际应用中,应该根据具体的业务场景和数据特点来选择最适合的策略组合。

Reference